package com.alibaba.alink.operator.common.recommendation;

import com.alibaba.alink.common.MTable;
import com.alibaba.alink.common.exceptions.AkUnsupportedOperationException;
import com.alibaba.alink.common.utils.JsonConverter;
import com.alibaba.alink.operator.common.evaluation.EvaluationUtil;
import com.alibaba.alink.operator.common.io.types.FlinkTypeConverter;
import com.alibaba.alink.params.recommendation.BaseSimilarItemsRecommParams;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.ml.api.misc.param.ParamInfo;
import org.apache.flink.ml.api.misc.param.ParamInfoFactory;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

import java.util.HashMap;
import java.util.List;

public class SwingRecommKernel extends RecommKernel implements Cloneable {
	private HashMap <Comparable <?>, SwingResData> itemRecomm;
	private final Integer topN;
	private TypeInformation itemType;

	public static ParamInfo <String> ITEM_TYPE = ParamInfoFactory
		.createParamInfo("itemType", String.class)
		.setDescription("itemType")
		.setRequired()
		.build();

	public SwingRecommKernel(TableSchema modelSchema, TableSchema dataSchema, Params params, RecommType recommType) {
		super(modelSchema, dataSchema, params, recommType);
		if (recommType == RecommType.SIMILAR_ITEMS) {
			this.topN = this.params.get(BaseSimilarItemsRecommParams.K);
			this.recommObjType = modelSchema.getFieldTypes()[0];
		} else {
			throw new AkUnsupportedOperationException("ItemKnn not support " + recommType + " yet!");
		}
	}

	@Override
	public void loadModel(List <Row> modelRows) {
		int itemNumber = modelRows.size();
		for (Row modelRow : modelRows) {
			if (modelRow.getField(0) == null) {
				Params meta = Params.fromJson((String) modelRow.getField(1));
				itemType = FlinkTypeConverter.getFlinkType(meta.get(ITEM_TYPE));
				itemNumber = modelRows.size() - 1;
				break;
			}
		}
		itemRecomm = new HashMap <>(itemNumber);
		for (Row modelRow : modelRows) {
			if (modelRow.getField(0) == null) {
				continue;
			} else {
				String jsonModel = (String) modelRow.getField(1);
				SwingResData data = JsonConverter.fromJson(jsonModel, SwingResData.class);
				if (null == itemType || itemType.equals(Types.STRING) || itemType.equals(Types.INT)) {
					itemRecomm.put((Comparable <?>) modelRow.getField(0), data);
				} else {
					Object[] objects = data.getObject();
					for (int i = 0; i < objects.length; i++) {
						objects[i] = EvaluationUtil.castTo(objects[i], itemType);
					}
					data.setObject(objects);
					itemRecomm.put((Comparable <?>) modelRow.getField(0), data);
				}
			}
		}
	}

	@Override
	public Double rate(Object[] infoUserItem) {
		throw new AkUnsupportedOperationException("swing not support rate.");
	}

	@Override
	public MTable recommendItemsPerUser(Object userId) {
		throw new AkUnsupportedOperationException("Swing not support recommendItemsPerUser.");
	}

	@Override
	public MTable recommendUsersPerItem(Object itemId) {
		throw new AkUnsupportedOperationException("Swing not support recommendItemsPerUser.");
	}

	@Override
	public MTable recommendSimilarItems(Object itemId) {
		Comparable <?> mainItem = (Comparable <?>) itemId;
		if (!itemRecomm.containsKey(mainItem)) {
			return null;
		}
		SwingResData items = itemRecomm.get(itemId);
		return items.returnTopNData(topN, recommObjType);
	}

	@Override
	public MTable recommendSimilarUsers(Object userId) {
		throw new AkUnsupportedOperationException("Swing not support recommendItemsPerUser.");
	}

	@Override
	public RecommKernel createNew() {
		return new SwingRecommKernel(getModelSchema(), getDataSchema(), params.clone(), recommType);
	}
}